// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*************************************************************************
 * @file FlowControlExamplePublisher.cpp
 * This file contains the implementation of the publisher functions.
 *
 * This file was generated by the tool fastcdrgen.
 */

#include "FlowControlExamplePublisher.h"

#include <chrono>
#include <thread>

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>

using namespace eprosima::fastdds::dds;
using namespace eprosima::fastrtps::rtps;

FlowControlExamplePublisher::FlowControlExamplePublisher()
    : participant_(nullptr)
    , fast_publisher_(nullptr)
    , slow_publisher_(nullptr)
    , topic_(nullptr)
    , fast_writer_(nullptr)
    , slow_writer_(nullptr)
    , myType(new FlowControlExamplePubSubType())
{
}

FlowControlExamplePublisher::~FlowControlExamplePublisher()
{
    if (fast_writer_ != nullptr)
    {
        fast_publisher_->delete_datawriter(fast_writer_);
    }
    if (slow_writer_ != nullptr)
    {
        slow_publisher_->delete_datawriter(slow_writer_);
    }
    if (fast_publisher_ != nullptr)
    {
        participant_->delete_publisher(fast_publisher_);
    }
    if (slow_publisher_ != nullptr)
    {
        participant_->delete_publisher(slow_publisher_);
    }
    if (topic_ != nullptr)
    {
        participant_->delete_topic(topic_);
    }
    DomainParticipantFactory::get_instance()->delete_participant(participant_);
}

bool FlowControlExamplePublisher::init()
{
    // Create Participant
    DomainParticipantQos pqos;
    pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
    pqos.name("Participant_publisher");  //You can put here the name you want

    participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }

    //Register the type
    myType.register_type(participant_);

    // Create fast Publisher, which has no controller of its own.
    fast_publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT);

    if (fast_publisher_ == nullptr)
    {
        return false;
    }

    // Create Topic
    topic_ = participant_->create_topic("FlowControlExamplePubSubTopic", myType.get_type_name(), TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    // Create fast DataWriter
    DataWriterQos wfqos;
    wfqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;

    fast_writer_ = fast_publisher_->create_datawriter(topic_, wfqos, &m_listener);

    if (fast_writer_ == nullptr)
    {
        return false;
    }
    std::cout << "Fast publisher created, waiting for Subscribers." << std::endl;

    // Create slow Publisher, with its own controller
    slow_publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT);

    if (slow_publisher_ == nullptr)
    {
        return false;
    }

    // Create slow DataWriter
    DataWriterQos wsqos;
    wsqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;

    // This controller allows 300kb per second.
    ThroughputControllerDescriptor slowPublisherThroughputController{300000, 1000};
    wsqos.throughput_controller() = slowPublisherThroughputController;

    slow_writer_ = slow_publisher_->create_datawriter(topic_, wsqos, &m_listener);

    if (slow_writer_ == nullptr)
    {
        return false;
    }
    std::cout << "Slow publisher created, waiting for Subscribers." << std::endl;
    return true;
}

void FlowControlExamplePublisher::PubListener::on_publication_matched(
        eprosima::fastdds::dds::DataWriter*,
        const eprosima::fastdds::dds::PublicationMatchedStatus& info)
{
    if (info.current_count_change == 1)
    {
        n_matched = info.total_count;
        std::cout << "Publisher matched." << std::endl;
    }
    else if (info.current_count_change == -1)
    {
        n_matched = info.total_count;
        std::cout << "Publisher unmatched." << std::endl;
    }
    else
    {
        std::cout << info.current_count_change
                  << " is not a valid value for PublicationMatchedStatus current count change" << std::endl;
    }
}

void FlowControlExamplePublisher::run()
{
    while (m_listener.n_matched == 0)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
    }

    // Publication code

    FlowControlExample st;

    /* Initialize your structure here */

    int msgsent_fast = 0;
    int msgsent_slow = 0;
    char ch;
    std::cout << "Flow Control example." << std::endl;
    std::cout << "Press \"f\" to send a sample through the fast writer, which has unlimited bandwidth" << std::endl;
    std::cout <<
        "Press \"s\" to send a sample through the slow writer, which is also limited by its own Flow Controller" <<
        std::endl;
    std::cout << "Press \"q\" quit" << std::endl;
    while (std::cin >> ch)
    {
        if (ch == 'f')
        {
            st.wasFast(true);
            fast_writer_->write(&st);  ++msgsent_fast;
            std::cout << "Sending sample, count=" << msgsent_fast <<
                " through the fast writer. Send another sample? (f-fast,s-slow,q-quit): ";
        }
        else if (ch == 's')
        {
            st.wasFast(false);
            slow_writer_->write(&st);  ++msgsent_slow;
            std::cout << "Sending sample, count=" << msgsent_slow <<
                " through the slow writer. Send another sample? (f-fast,s-slow,q-quit): ";
        }
        else if (ch == 'q')
        {
            std::cout << "Finishing Flow Control example" << std::endl;
            break;
        }
        else
        {
            std::cout << "Command " << ch << " not recognized, please enter \"f/s/q\":";
        }

    }
}
